package com.bigdata.wsr.map;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichMapFunction;

/**
 * cdc 数据清洗
 *
 * @author rui.wang
 * @date 2022/10/20
 */
@Slf4j
public class CdcDataMap extends RichMapFunction<String, String> {
    @Override
    public String map(String value) throws Exception {
        JSONObject lineObj = JSONObject.parseObject(value);
        String operateType = lineObj.getString("op");
        //如果是insert或者update操作，则取after中的数据，将是否删除设置为0
        JSONObject data;
        if ("d".equals(operateType)) {
            //如果是delete操作，则取before中的数据，将其设置为1
            data = JSONObject.parseObject(lineObj.getString("before"));
            data.put("is_delete_doris", 1);
        } else {
            data = JSONObject.parseObject(lineObj.getString("after"));
            data.put("is_delete_doris", 0);
        }
        data.put("timestamp", lineObj.getLongValue("ts_ms"));
        data.put("dbTable", lineObj.getString("dbTable"));
        data.put("nonTimeFields", lineObj.getJSONObject("nonTimeFields"));
        return data.toJSONString();
    }
}
